Fix streaming server. Redis connection subscribe for each channel. (#3828)
This commit is contained in:
		
							parent
							
								
									bab5a18232
								
							
						
					
					
						commit
						d8ec832806
					
				| @ -115,7 +115,7 @@ const startWorker = (workerId) => { | |||||||
| 
 | 
 | ||||||
|   const subs = {}; |   const subs = {}; | ||||||
| 
 | 
 | ||||||
|   redisSubscribeClient.on('pmessage', (_, channel, message) => { |   redisSubscribeClient.on('message', (channel, message) => { | ||||||
|     const callbacks = subs[channel]; |     const callbacks = subs[channel]; | ||||||
| 
 | 
 | ||||||
|     log.silly(`New message on channel ${channel}`); |     log.silly(`New message on channel ${channel}`); | ||||||
| @ -127,8 +127,6 @@ const startWorker = (workerId) => { | |||||||
|     callbacks.forEach(callback => callback(message)); |     callbacks.forEach(callback => callback(message)); | ||||||
|   }); |   }); | ||||||
| 
 | 
 | ||||||
|   redisSubscribeClient.psubscribe(`${redisPrefix}timeline:*`); |  | ||||||
| 
 |  | ||||||
|   const subscriptionHeartbeat = (channel) => { |   const subscriptionHeartbeat = (channel) => { | ||||||
|     const interval = 6*60; |     const interval = 6*60; | ||||||
|     const tellSubscribed = () => { |     const tellSubscribed = () => { | ||||||
| @ -144,12 +142,20 @@ const startWorker = (workerId) => { | |||||||
|   const subscribe = (channel, callback) => { |   const subscribe = (channel, callback) => { | ||||||
|     log.silly(`Adding listener for ${channel}`); |     log.silly(`Adding listener for ${channel}`); | ||||||
|     subs[channel] = subs[channel] || []; |     subs[channel] = subs[channel] || []; | ||||||
|  |     if (subs[channel].length === 0) { | ||||||
|  |       log.verbose(`Subscribe ${channel}`); | ||||||
|  |       redisSubscribeClient.subscribe(channel); | ||||||
|  |     } | ||||||
|     subs[channel].push(callback); |     subs[channel].push(callback); | ||||||
|   }; |   }; | ||||||
| 
 | 
 | ||||||
|   const unsubscribe = (channel, callback) => { |   const unsubscribe = (channel, callback) => { | ||||||
|     log.silly(`Removing listener for ${channel}`); |     log.silly(`Removing listener for ${channel}`); | ||||||
|     subs[channel] = subs[channel].filter(item => item !== callback); |     subs[channel] = subs[channel].filter(item => item !== callback); | ||||||
|  |     if (subs[channel].length === 0) { | ||||||
|  |       log.verbose(`Unsubscribe ${channel}`); | ||||||
|  |       redisSubscribeClient.unsubscribe(channel); | ||||||
|  |     } | ||||||
|   }; |   }; | ||||||
| 
 | 
 | ||||||
|   const allowCrossDomain = (req, res, next) => { |   const allowCrossDomain = (req, res, next) => { | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user