首页>代码>java实现即时消息提醒方法>/java推送/service/impl/RtcServiceImpl.java
/**
 * 文件名: RtcServiceImpl.java
 */
package com.gxzy.framework.comm.rtc.service.impl;

import java.io.IOException;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import javax.annotation.Resource;
import javax.servlet.AsyncContext;
import javax.servlet.ServletResponse;
import org.apache.log4j.Logger;
import org.apache.shiro.SecurityUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.gxzy.framework.comm.rtc.dao.RtcMessageDao;
import com.gxzy.framework.comm.rtc.entity.RtcMessage;
import com.gxzy.framework.comm.rtc.service.RtcService;
import com.gxzy.framework.comm.rtc.store.ConnectionStore;
import com.gxzy.framework.comm.rtc.store.MessageStore;
import com.gxzy.sys.base.dao.OnlineUserDao;
import com.gxzy.sys.base.entity.OnlineUser;
import com.gxzy.sys.base.service.OnlineUserService;
import com.gxzy.framework.util.S;
import com.gxzy.framework.util.bean.Pagination;
import com.gxzy.framework.util.persistence.QLBuilder;
import com.gxzy.framework.util.security.UserTool;
import com.gxzy.framework.util.serialize.JsonSerialize;

/**
 * 即时通讯服务实现类
 * 
 * @作者 AMY
 * @版本 1.0
 */
//@Repository(value = "commRtcService")
@Service
public class RtcServiceImpl implements RtcService
{
    private static Logger log = Logger.getLogger(RtcServiceImpl.class);

    @Resource(name = "commRtcMessageDao")
    private RtcMessageDao rtcMessageDao;

    @Resource(name = "sysOnlineUserDao")
    private OnlineUserDao onlineUserDao;
    
	@Autowired
	private OnlineUserService sysOnlineUserService;

	
    @Override
	public List<RtcMessage> list(String username, Integer limit) {
		QLBuilder ql = new QLBuilder();
		ql.and("recvUsername = :recvUsername", username);
		//ql.and("msgType!=:msgType","COMM_IM");
		//ql.add("order by state asc,createTime desc");
		ql.add("order by state asc ,createTime desc");
		List<RtcMessage> list = this.rtcMessageDao.selectManyLimitByJpql(ql,limit);
		return list;
	}

    
    
	@Override
	public Pagination<RtcMessage> list(Pagination<RtcMessage> page,String username) {
		QLBuilder ql = new QLBuilder();
		ql.and("recvUsername = :recvUsername", username);
		//ql.and("msgType!=:msgType","COMM_IM");
		ql.add("order by state asc,createTime desc");
		page = this.rtcMessageDao.selectPagedByJpql(ql, page);
		return page;
	}



	/*
     * (non-Javadoc)
     * 
     * @see
     * com.gxzy.oa.comm.rtc.service.RtcService#sendToOne(com.gxzy.oa.comm.rtc
     * .entity.RtcMessage)
     */
    @Override
    @Transactional
    public void sendToOne(RtcMessage msg)
    {
        
        String username = msg.getRecvUsername();
        //if (UserTool.getInstance().isOnline(msg.getRecvUsername()))
        if(this.sysOnlineUserService.isOnlineService(msg.getRecvUsername()))
        { 
            if (!MessageStore.contain(username))
            {
                Queue<RtcMessage> queue = new LinkedList<RtcMessage>();
                String jpql = "SELECT obj FROM RtcMessage AS obj WHERE obj.recvUsername=:username";
                List<RtcMessage> msgs = rtcMessageDao.selectManyByJpql(jpql,
                        username);
                if(msgs.size()>0){
                    queue.addAll(msgs);
                }

                // 提醒记录不主动删除,删除不需保存的
                /**
                if (msgs.size() > 0)
                {
                    for(RtcMessage m : msgs){
                    	if(!m.isPersistable())//不需要的删除
                        rtcMessageDao.delete(m.getMsgId());
                    }
                    
                }
                **/
                
                MessageStore.addMessageQueue(username, queue);
            }           
            MessageStore.putMessage(msg);          
            this.wakeupConnection(msg.getRecvUsername());
            if(msg.getPersistable()){//即使在线也保存
            	rtcMessageDao.insert(msg);
            }
        }else{//不在线事保存到数据库
        	rtcMessageDao.insert(msg);
        }
        //else if (msg.isPersistable())
        //{
        //无论是否在线都保存到数据库里
        //if(msg.isPersistable()){
            
        //}

       // }
    }

    /*
     * (non-Javadoc)
     * 
     * @see
     * com.gxzy.oa.comm.rtc.service.RtcService#sendToAllOnline(com.gxzy.oa.comm
     * .rtc.entity.RtcMessage)
     */
    @Override
    @Transactional
    public void sendToAllOnline(RtcMessage msg)
    {
    	//存在问题,插入数据时,可能会给单独用户插入多条
    	//新的方式查找在线用户    	
    	Map<String, List<AsyncContext>>  asys = ConnectionStore.getStore();  	
    	Date date = new Date();   	
    	for(String user:asys.keySet()){   		
    		 RtcMessage msgC = new RtcMessage();
             BeanUtils.copyProperties(msg, msgC);
             msgC.setRecvUsername(user);
             msgC.setCreateTime(date);            
             MessageStore.putMessage(msgC);            
             this.wakeupConnection(user);
             if(msg.getPersistable()){
                 this.rtcMessageDao.insert(msgC);            	 
             }
    	}
    	
    	/**
        List<OnlineUser> users = onlineUserDao
                .selectManyByJpql("SELECT obj FROM OnlineUser AS obj");

        if (users != null && users.size() > 0)
        {
            for (OnlineUser user : users)
            {
                if (UserTool.getInstance().isOnline(user))
                {
                    RtcMessage msgC = new RtcMessage();

                    BeanUtils.copyProperties(msg, msgC);
                    msgC.setRecvUsername(user.getUsername());
                    msgC.setCreateTime(new Date());

                    MessageStore.putMessage(msgC);
                    this.wakeupConnection(user.getUsername());
                }
            }
        }
        **/
    }

    /*
     * (non-Javadoc)
     * 
     * @see
     * com.gxzy.oa.comm.rtc.service.RtcService#sendToAllOthersOnline(com.gxzy
     * .oa.comm.rtc.entity.RtcMessage)
     */
    @Override
    @Transactional
    public void sendToAllOthersOnline(RtcMessage msg)
    {
    	
    	//新的方式查找在线用户	
    	Map<String, List<AsyncContext>>  asys = ConnectionStore.getStore();
    	Date date = new Date();
    	for(String user:asys.keySet()){  		
    		 RtcMessage msgC = new RtcMessage();
             BeanUtils.copyProperties(msg, msgC);
             msgC.setRecvUsername(user);
             /*
             msgC.setRecvRealname(UserTool.getInstance()
                     .getSubject(user);
             **/
             msgC.setCreateTime(date);             
             MessageStore.putMessage(msgC);            
             this.wakeupConnection(user); 
             if(msg.getPersistable()){
                 this.rtcMessageDao.insert(msgC);
             }

    	}
       
    	/**
        List<OnlineUser> users = onlineUserDao
                .selectManyByJpql("SELECT obj FROM OnlineUser AS obj");

        if (users != null && users.size() > 0)
        {
            //获取当前用户名
            String username = SecurityUtils.getSubject()
                    .getPrincipal().toString();
            
            for (OnlineUser user : users)
            {
                if ((!S.equal(user.getUsername(), username))
                        && UserTool.getInstance().isOnline(user))
                {
                    RtcMessage msgC = new RtcMessage();

                    BeanUtils.copyProperties(msg, msgC);
                    msgC.setRecvUsername(user.getUsername());
                    msgC.setRecvRealname(UserTool.getInstance()
                            .getSubject(user.getUsername()).getRealname());
                    msgC.setCreateTime(new Date());

                    MessageStore.putMessage(msgC);
                    this.wakeupConnection(user.getUsername());
                }
            }
        }
        **/
    }

    /*
     * (non-Javadoc)
     * 
     * @see
     * com.gxzy.oa.comm.rtc.service.RtcService#wakeupConnection(java.lang.String
     * )
     */
    @Override
    public void wakeupConnection(String key)
    {

        
        if (MessageStore.hasMessage(key) && ConnectionStore.contain(key))
        {
            // 取回异步请求
            List<AsyncContext> contexts = ConnectionStore.poll(key);
            
            //log.debug("context上下文: " + context.toString());
            
            // 取回即时消息
            List<RtcMessage> msgs = MessageStore.poll(key);            
            for(AsyncContext ac:contexts){
            	
                ServletResponse response = ac.getResponse();

                try
                {
                    response.getWriter().print(JsonSerialize.encode(msgs));
                }
                catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    log.error("序列化错误:", e);
                }

                ac.complete();
            	
            }
            

            
            log.debug("发送信息结束");
        }
    }

    /*
     * (non-Javadoc)
     * 
     * @see
     * com.gxzy.oa.comm.rtc.service.RtcService#setupConnection(java.lang.String)
     */
    @Override
    @Transactional
    public void setupConnection(String username, AsyncContext context)
    {
   	
        ConnectionStore.put(username, context);
        // 如果消息仓库中不含有存储该用户的Queue类,那么视为是第一次,需要到数据库中读取消息
        if (!MessageStore.contain(username))
        {
            Queue<RtcMessage> queue = new LinkedList<RtcMessage>();

            String jpql = "SELECT obj FROM RtcMessage AS obj WHERE obj.recvUsername=:username and state = :state";

            List<RtcMessage> msgs = rtcMessageDao.selectManyByJpql(jpql,
                    username,0);

            if (msgs.size() > 0)
            {
                queue.addAll(msgs);
                
                //this.deleteMessages(username);
            }

            MessageStore.addMessageQueue(username, queue);
        }

        this.wakeupConnection(username);
    }

    /* (non-Javadoc)
     * @see com.gxzy.oa.comm.rtc.service.RtcService#deleteMessages(java.lang.String)
     */
    @Override
    //@Transactional(propagation=Propagation.REQUIRES_NEW)
    @Transactional
    public void deleteMessages(String username)
    {
        // TODO Auto-generated method stub
        rtcMessageDao.deleteByJpql("DELETE FROM RtcMessage  WHERE recvUsername=:username", username);
    }
    @Transactional
    public void  readMessages(String username){  	
    	this.rtcMessageDao.updateByJpql("update RtcMessage set state = 100   WHERE recvUsername=:username", username);
    }

	@Override
	@Transactional
	public void readMessage(String msgId) {		
		//this.rtcMessageDao.delete(msgId);
		this.rtcMessageDao.updateByJpql("update RtcMessage set state = 100   WHERE msgId=:msgId", msgId);
	}
	
	@Override
	@Transactional
	public void deleteMessage(List<String> msgId) {		
		//this.rtcMessageDao.delete(msgId);
		//this.rtcMessageDao.delete(msgId);
		QLBuilder ql = new QLBuilder();
		ql.and("msgId in (:id) and recvUsername = :recvUsername", msgId,
				UserTool.getInstance().getCurrentSubject().getUsername());
		this.rtcMessageDao.deleteByJpql(ql);
	}
       
}
最近下载更多
dapeng0011  LV15 8月25日
zgf099  LV1 2022年9月19日
hanweinan6  LV13 2022年9月16日
Jack261108  LV2 2022年5月6日
hnn0909  LV1 2022年4月2日
lyxtmy  LV1 2022年1月27日
msssg123  LV1 2022年1月25日
ruiqiujuice  LV1 2021年12月21日
密码烟雨城  LV1 2021年12月13日
cyx1314cmx 2021年11月13日
暂无贡献等级
最近浏览更多
dearxo2014  LV1 11月9日
dapeng0011  LV15 8月25日
西瓜哥哥  LV4 2023年12月17日
诚壹lllyz 2023年11月17日
暂无贡献等级
溪若白  LV1 2023年7月13日
Dominick  LV14 2023年6月19日
zzhua195  LV6 2023年5月29日
wuziayng1232  LV10 2023年2月21日
long123_356  LV7 2022年12月10日
BestClever  LV32 2022年12月5日
顶部 客服 微信二维码 底部
>扫描二维码关注最代码为好友扫描二维码关注最代码为好友