Example application of Spring integration websocket integration (below)

  • 2020-05-07 19:51:39
  • OfStack

In the Spring integration websocket integration application example (above), we have implemented websocket, but there is still one core business implementation class that is not implemented. Here we implement this business core class, because the system in which the old husband participates sends messages USES websocket, so the implementation is how to send messages.

7.NewsListenerImpl implementation


package cn.bridgeli.websocket;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.lagou.common.base.util.date.DateUtil;
import com.lagou.platform.news.api.enumeration.PlatNewsCategoryType;
import com.lagou.platform.news.web.dao.ext.model.PlatNewsVo;
import com.lagou.platform.news.web.dao.ext.model.SearchCondition;
import com.lagou.platform.news.web.quartz.impl.TimingJob;
import com.lagou.platform.news.web.service.PlatNewsService;
import org.apache.commons.lang.StringUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Description :  In-station message listener implementation 
* @Date : 16-3-7
*/
@Component
public class NewsListenerImpl implements NewsListener{
private static final Logger logger = LoggerFactory.getLogger(NewsListenerImpl.class);
Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
// The thread pool 
private ExecutorService executorService = Executors.newCachedThreadPool();
// Task scheduling 
private SchedulerFactory sf = new StdSchedulerFactory();
@Autowired
private PlatNewsService platNewsService;
@Override
public void afterPersist(PlatNewsVo platNewsVo) {
logger.info(" Listen for a new message to add... ");
logger.info(" New message for :"+gson.toJson(platNewsVo));
// Starting a thread 
if(null != platNewsVo && !StringUtils.isBlank(platNewsVo.getCurrentoperatoremail())){
// If it's a timed message 
if(platNewsVo.getNewsType() == PlatNewsCategoryType.TIMING_TIME.getCategoryId()){
startTimingTask(platNewsVo); // Timed push 
}else{
// Immediately push 
executorService.execute(new AfterConnectionEstablishedTask(platNewsVo.getCurrentoperatoremail()));
}
}
}
@Override
public void afterConnectionEstablished(String email) {
logger.info(" To establish websocket Push new messages after connection... ");
if(!StringUtils.isBlank(email)){
executorService.execute(new AfterConnectionEstablishedTask(email));
}
}
/**
* @Description  :   If a timing message is newly added, the timing message task is started 
* @param platNewsVo
*/
private void startTimingTask(PlatNewsVo platNewsVo){
logger.info(" Start a regular push message task... ");
Date timingTime = platNewsVo.getTimingTime();
if(null == timingTime){
logger.info(" The timing message time is null . ");
return;
}
logger.info(" The timed push task time is: "+DateUtil.date2String(timingTime));
JobDetail jobDetail= JobBuilder.newJob(TimingJob.class)
.withIdentity(platNewsVo.getCurrentoperatoremail()+" Timing of the message "+platNewsVo.getId(), " Messages within the station ")
.build();
// Passing parameters 
jobDetail.getJobDataMap().put("platNewsService",platNewsService);
jobDetail.getJobDataMap().put("userEmail",platNewsVo.getCurrentoperatoremail());
Trigger trigger= TriggerBuilder
.newTrigger()
.withIdentity(" Timing message trigger "+platNewsVo.getId(), " Messages within the station ")
.startAt(timingTime)
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(0) // The time interval 
.withRepeatCount(0) // Repeat the number 
)
.build();
// Start timing task 
try {
Scheduler sched = sf.getScheduler();
sched.scheduleJob(jobDetail,trigger);
if(!sched.isShutdown()){
sched.start();
}
} catch (SchedulerException e) {
logger.info(e.toString());
}
logger.info(" Complete the task of opening the regular push message... ");
}
/**
* @Description :  To establish websocket The push thread after the link 
*/
class AfterConnectionEstablishedTask implements Runnable{
String email ;
public AfterConnectionEstablishedTask(String email){
this.email = email;
}
@Override
public void run() {
logger.info(" Start pushing messages to users :"+email+" . ");
if(!StringUtils.isBlank(email)){
SearchCondition searchCondition = new SearchCondition();
searchCondition.setOperatorEmail(email);
JSONArray jsonArray = new JSONArray();
for(PlatNewsCategoryType type : PlatNewsCategoryType.values()){
searchCondition.setTypeId(type.getCategoryId());
int count = platNewsService.countPlatNewsByExample(searchCondition);
JSONObject object = new JSONObject();
object.put("name",type.name());
object.put("description",type.getDescription());
object.put("count",count);
jsonArray.add(object);
}
if(null != jsonArray && jsonArray.size()>0){
UserSocketVo userSocketVo = WSSessionLocalCache.get(email);
TextMessage reMessage = new TextMessage(gson.toJson(jsonArray));
try {
if(null != userSocketVo){
// Push message 
userSocketVo.getWebSocketSession().sendMessage(reMessage);
// Update push time 
userSocketVo.setLastSendTime(DateUtil.getNowDate());
logger.info(" Complete pushing new messages to users :"+userSocketVo.getUserEmail()+" . ");
}
} catch (IOException e) {
logger.error(e.toString());
logger.info(" Internal message push failed... "+e.toString());
}
}
}
logger.info(" End the push message to "+email+" . ");
}
}
}

This class is the implementation of the core business of websocket, which is definitely related to the business. Due to the different business, the implementation is definitely different, because the old man participates in the system of sending messages, so the most core sentence is:


userSocketVo.getWebSocketSession().sendMessage(reMessage);

Send our message through WebSocketSession's sendMessage method. In addition, this is mainly the implementation of the back end, as to the realization of the front end, because the old man is more concerned about the back end of the ape, so the front end is not much to do the introduction, you can go to the Internet to check information. The last thing to be noted is that when the old husband searched some learning materials before, he found that his colleague's writing method was almost the same as that of an article. I think the colleague should have referred to this article, so it is listed below as reference.


Related articles: