From cf265cbed048b9324aea4b0bd591dbb34fd6c07b Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Wed, 4 Jan 2023 18:09:10 +0300 Subject: ActivityPub: refactor delivery to followers --- src/main/java/com/juick/ActivityPubManager.java | 144 ++++++++++++------------ 1 file changed, 72 insertions(+), 72 deletions(-) (limited to 'src') diff --git a/src/main/java/com/juick/ActivityPubManager.java b/src/main/java/com/juick/ActivityPubManager.java index 9dc6a4f0..a487afb1 100644 --- a/src/main/java/com/juick/ActivityPubManager.java +++ b/src/main/java/com/juick/ActivityPubManager.java @@ -35,6 +35,7 @@ import com.juick.util.MessageUtils; import com.juick.util.formatters.PlainTextFormatter; import com.juick.www.api.SystemActivity.ActivityType; import com.juick.www.api.activity.helpers.ProfileUriBuilder; +import com.juick.www.api.activity.model.Activity; import com.juick.www.api.activity.model.Context; import com.juick.www.api.activity.model.activities.*; import com.juick.www.api.activity.model.objects.*; @@ -172,21 +173,15 @@ public class ActivityPubManager implements ActivityListener, NotificationListene Message object = event.getMessage(); User user = event.getUser(); Actor me = conversionService.convert(user, Actor.class); - socialService.getFollowers(user).forEach(acct -> { - try { - Actor follower = (Actor) signatureManager.getContext(URI.create(acct)).orElseThrow(); - Update update = new Update(); - var note = makeNote(object); - update.setId(note.getId() + "#update"); - update.setActor(me.getId()); - update.setObject(note); - update.setPublished(Instant.now()); - logger.info("Update to follower {}", follower.getId()); - signatureManager.post(me, follower, update); - } catch (Exception e) { - logger.warn("{} exception", acct, e); - } - }); + Update update = new Update(); + var note = makeNote(object); + update.setId(note.getId() + "#update"); + update.setActor(me.getId()); + update.setObject(note); + update.setTo(Collections.singletonList(Context.ACTIVITYSTREAMS_PUBLIC)); + update.setPublished(Instant.now()); + logger.info("{} sends note update to followers", me.getId()); + activityToFollowers(user, me, update); } @Override @@ -199,31 +194,45 @@ public class ActivityPubManager implements ActivityListener, NotificationListene } } + private void activityToFollowers(User user, Actor from, Activity activity) { + socialService.getFollowers(user).forEach(acct -> { + var context = signatureManager.getContext(URI.create(acct)); + context.ifPresentOrElse((follower) -> { + if (follower instanceof Actor) { + var to = (Actor) follower; + try { + signatureManager.post(from, to, activity); + } catch (Exception e) { + logger.warn("Delivery to {} failed: {}", to, e.getMessage()); + } + } else { + try { + logger.warn("Unknown actor: {}", jsonMapper.writeValueAsString(follower)); + } catch (JsonProcessingException e) { + logger.warn("Invalid JSON: {}", acct); + } + } + }, () -> { + logger.warn("Context not verified: {}", acct); + }); + }); + } + @Override public void processUpdateUserEvent(UpdateUserEvent event) { User user = event.getUser(); String userUri = profileUriBuilder.personUri(user); Actor me = conversionService.convert(user, Actor.class); - socialService.getFollowers(user).forEach(acct -> { - try { - var context = signatureManager.getContext(URI.create(acct)); - if (context.isPresent() && context.get() instanceof Actor follower) { - Update update = new Update(); - update.setId(userUri + "#update"); - update.setActor(me.getId()); - update.setObject(me); - update.setPublished(Instant.now()); - logger.info("Update to follower {}", follower.getId()); - signatureManager.post(me, follower, update); - } else { - logger.warn("Unhandled context: {}", acct); - } - } catch (Exception e) { - logger.warn("activitypub exception", e); - } - }); + Update update = new Update(); + update.setId(userUri + "#update"); + update.setActor(me.getId()); + update.setObject(me); + update.setTo(Collections.singletonList(Context.ACTIVITYSTREAMS_PUBLIC)); + update.setPublished(Instant.now()); + logger.info("{} sends profile update to followers", me.getId()); + activityToFollowers(user, me, update); } - + private void processMessage(Message msg) { if (MessageUtils.isPM(msg) || msg.isService()) { return; @@ -243,20 +252,29 @@ public class ActivityPubManager implements ActivityListener, NotificationListene subscribers.forEach(acct -> { if (!acct.equals(profileUriBuilder.followersUri(user))) { var context = signatureManager.getContext(URI.create(acct)); - if (context.isPresent() && context.get() instanceof Actor follower) { - Create create = new Create(); - create.setId(note.getId()); - create.setActor(me.getId()); - create.setPublished(note.getPublished()); - create.setObject(note); - try { - signatureManager.post(me, follower, create); - } catch (IOException | NoSuchAlgorithmException e) { - logger.warn("activitypub exception", e); + context.ifPresentOrElse((follower) -> { + if (follower instanceof Actor) { + var to = (Actor) follower; + Create create = new Create(); + create.setId(note.getId()); + create.setActor(me.getId()); + create.setPublished(note.getPublished()); + create.setObject(note); + try { + signatureManager.post(me, to, create); + } catch (IOException | NoSuchAlgorithmException e) { + logger.warn("Delivery to {} failed: {}", to, e.getMessage()); + } + } else { + try { + logger.warn("Unknown actor: {}", jsonMapper.writeValueAsString(follower)); + } catch (JsonProcessingException e) { + logger.warn("Invalid JSON: {}", acct); + } } - } else { - logger.warn("Unhandled context: {}", acct); - } + }, () -> { + logger.warn("Context not verified: {}", acct); + }); } }); } @@ -287,7 +305,8 @@ public class ActivityPubManager implements ActivityListener, NotificationListene attachment.setMediaType(HttpUtils.mediaType(msg.getAttachmentType())); note.setAttachment(Collections.singletonList(attachment)); } - note.setTags(msg.getTags().stream().map(t -> new Hashtag(profileUriBuilder.tagUri(t), t.getName())).collect(Collectors.toList())); + note.setTags(msg.getTags().stream().map(t -> new Hashtag(profileUriBuilder.tagUri(t), t.getName())) + .collect(Collectors.toList())); if (msg.getReplyToUri() != null && msg.getReplyToUri().toASCIIString().length() > 0) { Optional noteContext = signatureManager.getContext(msg.getReplyToUri()); if (noteContext.isPresent()) { @@ -346,30 +365,11 @@ public class ActivityPubManager implements ActivityListener, NotificationListene announce.setActor(profileUriBuilder.personUri(user)); announce.setTo(Collections.singletonList(Context.ACTIVITYSTREAMS_PUBLIC)); announce.setObject(note); - signatureManager.getContext(URI.create(announce.getActor())).ifPresentOrElse((ctx) -> { - if (ctx instanceof Actor) { - socialService.getFollowers(user).forEach(acct -> { - var follower = signatureManager.getContext(URI.create(acct)); - follower.ifPresentOrElse((person) -> { - if (person instanceof Actor) { - try { - logger.info("{} announcing {} to {}", user.getName(), message.getMid(), acct); - signatureManager.post((Actor) ctx, (Actor) person, announce); - } catch (IOException | NoSuchAlgorithmException e) { - logger.warn("activitypub exception", e); - } - } else { - logger.warn("Unhandled context: {}", acct); - } - }, () -> logger.warn("Follower not found: {}", acct)); - }); - } else { - logger.warn("Unhandled context: {}", announce.getActor()); - } - }, () -> { - logger.warn("Context not found: {}", announce.getActor()); - }); + var me = conversionService.convert(user, Actor.class); + logger.info("{} announcing {} to followers", user.getName(), message.getMid()); + activityToFollowers(user, me, announce); } + public User actorToUser(URI uri) throws HttpBadRequestException, JsonProcessingException { var context = signatureManager.getContext(uri); if (context.isPresent() && context.get() instanceof Actor actor) { @@ -381,7 +381,7 @@ public class ActivityPubManager implements ActivityListener, NotificationListene } return user; } else { - logger.warn("Unhandled context: {}", jsonMapper.writeValueAsString(context)); + logger.warn("Unhandled context: {}", uri); throw new HttpBadRequestException(); } } -- cgit v1.2.3